Skip to content

[core] Share JDBC connection pool across catalog instances in same JVM#8323

Open
nickdelnano wants to merge 4 commits into
apache:masterfrom
nickdelnano:nickdelnano/jdbc-cached-client-pool
Open

[core] Share JDBC connection pool across catalog instances in same JVM#8323
nickdelnano wants to merge 4 commits into
apache:masterfrom
nickdelnano:nickdelnano/jdbc-cached-client-pool

Conversation

@nickdelnano

@nickdelnano nickdelnano commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Introduces CachedJdbcClientPool, a static cache that shares JdbcClientPool instances across all JdbcCatalog instances in the same JVM, keyed by (JDBC URI, catalog-key).
  • Mirrors the existing pattern used by Hive's CachedClientPool for HMS Thrift connections.
  • Reduces JDBC connections from O(parallelism × operators × pool_size) to O(TaskManagers × pool_size) in Flink CDC jobs.

For high parallelism jobs this reduces jdbc connection count by 8x or more.

Motivation

In Flink CDC sync jobs, every operator subtask that calls catalogLoader.load() creates a new JdbcCatalog with its own dedicated connection pool (default size: 2). With parallelism 16, a single job creates ~128 JDBC connections across parser, schema evolution, writer, and committer operators. The HiveCatalog avoids this via CachedClientPool — a static Caffeine cache that shares a single HMS client pool across all catalog instances in the same JVM.

This PR applies this pattern to JdbcCatalog.

Design Decision: ConcurrentHashMap vs Caffeine

HiveCatalog uses Caffeine with expireAfterAccess and avoids premature eviction by never exposing the raw pool — CachedClientPool implements ClientPool and delegates every run() call through clientPoolCache.get(key), which refreshes the access timer on each operation.

Using Caffeine for JDBC would require a larger change because all 16 methods in JdbcUtils are typed to accept JdbcClientPool, not the ClientPool interface. If using Caffeine is preferred I will do that instead. I have started with the simpler change. Please let me know.

Instead we use a ConcurrentHashMap with no time-based eviction. Pools live for the JVM lifetime and are closed via a shutdown hook. This is safe because:

  • Connection pools are lightweight to hold
  • In Flink, catalogs are long-lived (created at job start, used for hours)
  • The shutdown hook ensures deterministic cleanup

Changes

  • CachedJdbcClientPool (new): Static ConcurrentHashMap<Key, JdbcClientPool> keyed on (URI, catalog-key). JVM shutdown hook closes all pools.
  • JdbcCatalog: Uses CachedJdbcClientPool.get() instead of creating a dedicated pool. close() no longer closes the shared pool.
  • JdbcCatalogLockContext: Same — uses the shared cached pool.

Test plan

  • JdbcCatalogTest (70 tests) — passes
  • JdbcClientPoolTest (4 tests) — passes
  • CachedJdbcClientPoolTest (6 tests) — new tests for pool sharing semantics

Previously, every call to catalogLoader.load() created a new JdbcCatalog
with its own dedicated JdbcClientPool (default 2 connections). In Flink
CDC jobs, each operator subtask (parser, writer, committer) creates its
own catalog, resulting in O(parallelism * operators * pool_size) JDBC
connections per job.

This mirrors the pattern used by HiveCatalog's CachedClientPool: a
static Caffeine cache keyed on (JDBC URI, catalog-key) shares a single
JdbcClientPool across all catalog instances in the same TaskManager JVM.
Idle pools are evicted after a configurable interval (default 5 min).
Verifies that:
- Same URI + catalog-key returns the same pool instance
- Different URI returns a different pool instance
- Different catalog-key returns a different pool instance
- The shared pool is usable (connections work)
- Multiple JdbcCatalog instances share the same underlying pool
- resetCache() clears all cached pools
@nickdelnano nickdelnano force-pushed the nickdelnano/jdbc-cached-client-pool branch from 7a048ad to 2d7225b Compare June 22, 2026 18:19
Fixes 6 issues found in code review:

1. Pool evicted while active: Caffeine's expireAfterAccess would close the
   pool after 5 minutes even if the catalog was still using it, because
   the catalog stored the raw pool and never refreshed the cache access
   time. Fix: use ConcurrentHashMap with no eviction — pools live for
   the JVM lifetime.

2. Race in init(): synchronized on 'this' but guarding a static field.
   Fix: eliminated entirely — ConcurrentHashMap.computeIfAbsent is
   thread-safe.

3. Credentials not in cache key: catalogs with same URI but different
   users would share a pool. Fix: include user:password in the key.

4. First-initializer-wins for eviction config: no longer relevant since
   there's no eviction configuration.

5. close() was a no-op with no graceful shutdown: Fix: added a JVM
   shutdown hook that closes all pools.

6. No Caffeine Scheduler meant expired entries lingered: no longer
   relevant since we don't use Caffeine.
@nickdelnano nickdelnano force-pushed the nickdelnano/jdbc-cached-client-pool branch from 2d7225b to a5731b8 Compare June 22, 2026 18:24
When close() is called while a run() action is in-flight, the connection
is returned to the deque after close() has already drained it. The
connection is then orphaned and never closed.

Fix: after returning the client to the deque, check if close() raced us
(this.clients == null). If so, remove the client we just added and close
it directly.

Credit: extracted from apache#8268.
this.dbUrl = options.get(URI);
this.poolSize = options.get(CLIENT_POOL_SIZE);
this.props = props;
this.key = Key.of(dbUrl, options.get(JdbcCatalogOptions.CATALOG_KEY));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This key does not include the JDBC connection properties used by JdbcClientPool (props is later passed to JdbcUtils.extractJdbcConfiguration, including jdbc.user / jdbc.password). If two catalogs in the same JVM use the same URI and catalog-key but different credentials, the second catalog will reuse the pool created by the first one and run with the wrong database user. Please include the effective JDBC connection properties in the cache key (and add a regression test with the same URI/catalog-key but different jdbc.user or jdbc.password).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants